故障转移的核心逻辑
当 promisifyServiceMethod 的错误回调被触发后,需要执行以下操作:
- 通过
GrpcClientInterface获取ConsulService实例 - 调用
updateGrpcClient方法更新故障的客户端 - 重新从 Consul 获取健康的实例
- 通过 BehaviorSubject 发布新的 gRPC 客户端
updateGrpcClient 方法实现
// src/consul/consul.service.ts
export class ConsulService {
/**
* 更新指定服务的 gRPC 客户端
* 从 Consul 获取新的健康实例并替换旧连接
*/
async updateGrpcClient(serviceName: string): Promise<void> {
// 1. 找到对应的 gRPC 客户端配置
const serviceOptions = this.findServiceOptions(serviceName);
if (!serviceOptions) {
console.error(`Service options not found: ${serviceName}`);
return;
}
// 2. 从 Consul 获取新的健康实例
const serviceInfo = await this.getServiceInfo(serviceName);
if (!serviceInfo) {
console.warn(`No healthy instance available for: ${serviceName}`);
// TODO: 尝试跨数据中心查找
return;
}
// 3. 创建新的 gRPC 客户端
const newClient = this.createGrpcClient(
serviceInfo,
serviceOptions.packageName,
serviceOptions.protoPath,
);
// 4. 找到对应的 GrpcClientInterface 并更新
const grpcClient = ConsulService.grpcClients.find(
(item) => item.serviceName === serviceName && item.consulName === this.name,
);
if (grpcClient) {
grpcClient.client.next(newClient);
console.log(`Updated gRPC client for ${serviceName}`);
}
}
/**
* 从 services 配置中查找指定服务的选项
*/
private findServiceOptions(
serviceName: string,
): ConsulServiceOptions | undefined {
if (Array.isArray(this.services)) {
return this.services.find((s) => s.serviceName === serviceName);
}
return this.services.serviceName === serviceName
? this.services
: undefined;
}
}
typescript
错误回调中触发更新
// src/modules/auth/auth.service.ts
async onModuleInit() {
const clientInterface = await this.grpcClient();
if (!clientInterface?.client) return;
clientInterface.client.subscribe((client: ClientGrpc | null) => {
if (!client) return;
this.userService = client.getService('UserService');
promisifyServiceMethods(this.userService, (error) => {
console.error('gRPC call failed, triggering failover:', error);
// 通过 GrpcClientInterface 上的 consulService 触发更新
clientInterface.consulService.updateGrpcClient(
clientInterface.serviceName,
);
});
});
}
typescript
故障转移的完整流程
微服务调用失败
↓
promisifyServiceMethod catch 捕获异常
↓
errorCallbackFunction(error) 被调用
↓
clientInterface.consulService.updateGrpcClient(serviceName)
↓
updateGrpcClient 执行:
├── findServiceOptions(serviceName) → 找到服务配置
├── getServiceInfo(serviceName) → 从 Consul 获取新实例
├── createGrpcClient(serviceInfo) → 创建新 gRPC 客户端
└── grpcClient.client.next(newClient) → 发布新客户端
↓
BehaviorSubject 发出新的 ClientGrpc
↓
AuthService 的 subscribe 回调被触发
↓
重新初始化 userService
↓
后续请求使用新的健康实例
text
跨数据中心故障转移
当前实现仅在同一数据中心内查找健康实例。如果当前数据中心没有可用实例,需要跨数据中心查找:
// TODO: 跨数据中心查找逻辑
if (!serviceInfo) {
// 遍历其他 Consul 实例
const allConsulServices = Object.values(ConsulCoreModule.consulServices);
for (const otherConsulService of allConsulServices) {
if (otherConsulService.name === this.name) continue; // 跳过当前实例
const otherServiceInfo = await otherConsulService.getServiceInfo(serviceName);
if (otherServiceInfo) {
// 在其他数据中心找到了健康实例
// 更新 gRPC 客户端指向新数据中心的实例
break;
}
}
}
typescript
设计关键点回顾
GrpcClientInterface 的完整属性设计在此刻发挥了关键作用:
| 属性 | 故障转移中的作用 |
|---|---|
client (BehaviorSubject) | 通过 next() 发布新的 gRPC 客户端 |
consulName | 确定当前属于哪个 Consul 实例 |
consulClient | 直接操作 Consul API 查询健康实例 |
serviceName | 精确定位需要更新的服务 |
consulService | 调用 updateGrpcClient 方法 |
↑